package kafka

import (
	"github.com/Shopify/sarama"
)

const (
	OffsetOldest = sarama.OffsetOldest //
	OffsetNewest = sarama.OffsetNewest
)

type ConsumerMessage struct {
	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
}

// getMessage 转换sarama.ConsumerMessage生成ConsumerMessage对象
func getMessage(message *sarama.ConsumerMessage) *ConsumerMessage {
	return &ConsumerMessage{
		Key:       message.Key,
		Value:     message.Value,
		Topic:     message.Topic,
		Partition: message.Partition,
		Offset:    message.Offset,
	}
}

type Consumer interface {
	Topics() ([]string, error)
	Partitions(topic string) ([]int32, error)
	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
}

type consumer struct {
	consumer sarama.Consumer
}

func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
	sPartitionConsumer, err := c.consumer.ConsumePartition(topic, partition, offset)
	if err != nil {
		return nil, err
	}
	return &partitionConsumer{
		partitionConsumer: sPartitionConsumer,
	}, nil
}

func (c *consumer) Topics() ([]string, error) {
	return c.consumer.Topics()
}

func (c *consumer) Partitions(topic string) ([]int32, error) {
	return c.consumer.Partitions(topic)
}
